import pandas as pd
import numpy as np
import os
from datetime import datetime
import matplotlib.pyplot as plt
from matplotlib import pyplot
import plotly
# import plotly.graph_objects as go
import plotly.graph_objs as go
# import matplotlib.pyplot as plt
import plotly.figure_factory as ff
from sklearn.metrics import mean_absolute_percentage_error
from sklearn.metrics import mean_squared_error
from math import sqrt
import scipy.stats as stats
from scipy import signal
import warnings
import logging
logging.disable(logging.CRITICAL)
warnings.filterwarnings("ignore", category=UserWarning)
# =======================================KERAS LSTM=======================================
import math, time
from operator import itemgetter
from sklearn.preprocessing import MinMaxScaler
from sklearn import preprocessing
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation
from keras.layers import LSTM
from keras.models import load_model
import keras
import h5py
import requests
import os
C:\Users\ji3yi\anaconda3\lib\site-packages\scipy\__init__.py:146: UserWarning: A NumPy version >=1.16.5 and <1.23.0 is required for this version of SciPy (detected version 1.23.5
warnings.warn(f"A NumPy version >={np_minversion} and <{np_maxversion}"
stockcode_name = {
'03011':'AMLEX',
'7204':'DO',
'0128':'FRONTKN',
'0166':'INARI',
'0127':'JHM',
'9334':'KESM',
'0143':'KEYASIC',
'3867':'MPI',
'0070':'MQTECH',
'4359':'TURIYA',
'5005':'UNISEM',
'0120':'VIS',
'0097':'VITROX'
}
__path__ = "C:/Users/ji3yi/OneDrive - Universiti Malaya/Research Project/Production Code/data/yahoo_finance/daily/"
os.chdir(__path__)
csv_files = [f for f in os.listdir() if f.endswith('.csv')]
for csv in csv_files:
stockcode = csv.split('.')[0]
for i in stockcode_name.keys():
name = stockcode_name.get(i).lower()
if stockcode == i:
globals()[f'df_{name}'] = pd.read_csv(csv)
exsig_name = {
'CAPUTLG3344S':'capacity_utilization_semicon',
'IY3344':'export_price_semicon',
'IZ3344':'import_price_semicon',
'PCU33443344':'ppi_semicon',
'CPIAUCSL':'cpi',
'DEXMAUS':'myr_to_usd',
'IPG3344S':'industrial_production_semicon',
'INDPRO':'industrial_production',
'PCU334413334413A':'ppi_other_semicon_chips_wafers_heatsinks',
'COINDUSZ3344':'import_price_origin',
'CES3133440001':'all_employees_semicon',
'ID8541':'export_price_index_harmonized',
'PCU3261993261992':'ppi_electrical_electronic_plastic',
'IP8542':'import_price_index_harmonized',
'VIXCLS':'cboe_volatility_index',
'A34SIS':'manufacturers_inventories_to_shipments'
}
__path__ = "C:/Users/ji3yi/OneDrive - Universiti Malaya/Research Project/Production Code/data/fred/monthly/"
os.chdir(__path__)
csv_files = [f for f in os.listdir() if f.endswith('.csv')]
df_exsig = pd.DataFrame()
for csv in csv_files:
exsig_code = csv.split('.')[0]
if exsig_code in exsig_name:
name = exsig_name[exsig_code].lower()
csv_path = os.path.join(__path__, csv)
try:
df = pd.read_csv(csv_path)
df = df.rename(columns={exsig_code: name})
globals()[f'df_{name}'] = df
except Exception as e:
print(f"Error reading {csv}: {e}")
df_exsig = pd.DataFrame()
df_exsig['ds'] = pd.date_range(start='2018-09-01', end='2023-09-01', freq='MS')
for exsig_code in exsig_name:
name = exsig_name[exsig_code].lower()
if f'df_{name}' in globals():
df_name = globals()[f'df_{name}']
if df_name['DATE'].duplicated().any():
df_name.drop_duplicates(subset='DATE', keep='first', inplace=True)
df_name.rename(columns={'DATE': 'ds'}, inplace=True)
df_name['ds'] = pd.to_datetime(df_name['ds'])
df_exsig = pd.merge(df_exsig, df_name, on='ds', how='left')
df_exsig.head()
| ds | capacity_utilization_semicon | export_price_semicon | import_price_semicon | ppi_semicon | cpi | myr_to_usd | industrial_production_semicon | industrial_production | ppi_other_semicon_chips_wafers_heatsinks | import_price_origin | all_employees_semicon | export_price_index_harmonized | ppi_electrical_electronic_plastic | import_price_index_harmonized | cboe_volatility_index | manufacturers_inventories_to_shipments | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 2018-09-01 | 85.7278 | 89.1 | 79.6 | 55.9 | 252.182 | 4.138789 | 107.5010 | 104.1181 | 51.6 | 95.2 | 369.2 | 103.2 | 131.8 | 64.6 | 12.910526 | 1.78 |
| 1 | 2018-10-01 | 84.0109 | 88.9 | 79.8 | 55.6 | 252.772 | 4.157045 | 106.0938 | 103.9397 | 51.8 | 95.1 | 369.9 | 103.3 | 131.8 | 65.0 | 19.352174 | 1.74 |
| 2 | 2018-11-01 | 81.7101 | 88.8 | 79.8 | 55.6 | 252.594 | 4.182475 | 103.8852 | 104.0007 | 51.8 | 95.2 | 370.5 | 103.2 | 131.8 | 65.0 | 19.389048 | 1.74 |
| 3 | 2018-12-01 | 80.6747 | 88.7 | 79.4 | 55.6 | 252.767 | 4.168844 | 103.2098 | 103.9946 | 51.8 | 95.2 | 371.7 | 102.9 | 131.8 | 65.0 | 24.953158 | 1.78 |
| 4 | 2019-01-01 | 81.4591 | 88.8 | 79.4 | 55.1 | 252.718 | 4.115975 | 104.8024 | 103.3730 | 52.2 | 95.5 | 375.2 | 102.4 | 130.7 | 65.0 | 19.572381 | 1.78 |
def getDataset(stockname, settings):
start = datetime.now()
data = globals()[f'df_{stockname.lower()}'].copy()
# print(data.head())
data['Date'] = pd.to_datetime(data['Date'])
df_exsig['ds'] = pd.to_datetime(df_exsig['ds'])
external_signals = {}
ext_signal_info = settings['hyperparameters']['external_signals']
for k in ext_signal_info.keys():
if k == 'external_signals_combo':
break
external_signals[k] = df_exsig[['ds', k]].copy()
external_signals[k].rename(columns={ext_signal_info[k]['date_col']: 'date_col', ext_signal_info[k]['value_col']: k},
inplace=True)
for k in external_signals.keys():
external_signals[k]['month'] = pd.PeriodIndex(external_signals[k]['date_col'], freq='M') + ext_signal_info[k]['lags']
external_signals[k]['ds'] = pd.PeriodIndex(external_signals[k]['month']).to_timestamp()
external_signals[k] = external_signals[k][['ds', k]]
# print(external_signals[k])
start_month = min(data['Date'].to_list())
merged_cols = set()
for k in external_signals.keys():
if k not in merged_cols:
data = data.merge(external_signals[k], how='left', left_on='Date', right_on='ds')
merged_cols.add(k)
# print(data.columns)
data.rename(columns={'Date':'ds', 'Adj Close':'y'}, inplace=True)
# data = data.T.drop_duplicates().T
rel_columns = [col for col in data.columns if col in external_signals.keys() or
col in ['ds','y','year','month']]
data = data[rel_columns]
return data
def wmape(y_true, y_pred):
"""adapted version of MPAE that solves the problem of diviosn by zero when there are no values"""
y_true = np.nan_to_num(y_true, nan=0)
y_pred = np.nan_to_num(y_pred, nan=0)
return round((np.abs(y_true - y_pred).sum() / np.abs(y_true).sum()), 4)
def mae(y_true, y_pred):
"""Adapted version of MAE that handles division by zero when there are no values"""
y_true = np.nan_to_num(y_true, nan=0)
y_pred = np.nan_to_num(y_pred, nan=0)
return round((np.abs(y_true - y_pred).sum() / max(np.count_nonzero(y_true), 1)), 4)
def rmse(y_true, y_pred):
"""Adapted version of RMSE that handles division by zero when there are no values"""
y_true = np.nan_to_num(y_true, nan=0)
y_pred = np.nan_to_num(y_pred, nan=0)
return round(np.sqrt((np.square(y_true - y_pred).sum()) / max(np.count_nonzero(y_true), 1)), 4)
def create_dataset(dataset, look_back=None):
dataX, dataY = [], []
for i in range(len(dataset)-look_back-1):
a = dataset[i:(i+look_back), 0]
dataX.append(a)
dataY.append(dataset[i + look_back, 0])
return np.array(dataX), np.array(dataY)
def categorize(array):
if array < 30:
return 'Low' # oversold condition, a potential price increase
elif array < 70:
return 'Medium' # neutral, no strong trend
elif array > 70:
return 'High'
else:
return np.nan # overbought condition, a potential price decrease
def evaluate(model, test_features, test_labels):
predictions = model.predict(test_features)
errors = abs(predictions - test_labels)
mape = 100 * np.mean(errors / test_labels)
accuracy = 100 - mape
def wmape(y_true, y_pred):
"""adapted version of MPAE that solves the problem of diviosn by zero when there are no values"""
return round((np.abs(y_true - y_pred).sum() / np.abs(y_true).sum()), 4)
w_mape = wmape(test_labels, predictions)
print()
print('Model Performance: ')
print('Average Error: {:0.4f} degrees.'.format(np.mean(errors)))
print('Accuracy = {:0.2f}%.'.format(accuracy))
print('test mape: {:0.4f}'.format(w_mape))
return accuracy
def split_train_test(data, look_back=None):
data.set_index('ds', inplace=True)
data['ds'] = data.index
data['ds'] = pd.to_datetime(data['ds'])
min_max_scaler = preprocessing.MinMaxScaler(feature_range=(0, 1))
dataset = min_max_scaler.fit_transform(data['y'].values.reshape(-1, 1))
train_size = int(len(dataset) * 0.7)
test_size = len(dataset) - train_size
train, test = dataset[0:train_size,:], dataset[train_size:len(dataset),:]
# print(len(train), len(test))
# print(len(data))
# print(len(train) + len(test))
x_train, y_train = create_dataset(train, look_back=look_back)
x_test, y_test = create_dataset(test, look_back=look_back)
x_train = np.reshape(x_train, (x_train.shape[0], 1, x_train.shape[1]))
# print(x_train.shape)
# print(y_train.shape)
# print(x_test.shape)
# print(y_test.shape)
return train, test, x_train, y_train, x_test, y_test, min_max_scaler, train_size, test_size
def train_predict_lstm(x_train, x_test, y_train, y_test, look_back=None, min_max_scaler=None):
model = Sequential()
model.add(LSTM(20, input_shape=(1, look_back)))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(x_train, y_train, epochs=20, batch_size=1, verbose=0)
trainPredict = model.predict(x_train)
x_test = np.reshape(x_test, (x_test.shape[0], 1, x_test.shape[1]))
testPredict = model.predict(x_test)
trainPredict = trainPredict.reshape(-1, 1)
testPredict = testPredict.reshape(-1, 1)
# invert predictions
trainPredict = min_max_scaler.inverse_transform(trainPredict)
trainY = min_max_scaler.inverse_transform([y_train])
testPredict = min_max_scaler.inverse_transform(testPredict)
testY = min_max_scaler.inverse_transform([y_test])
# calculate root mean squared error
# print()
trainRMSE = rmse(trainY[0], trainPredict[:,0])
testRMSE = rmse(testY[0], testPredict[:,0])
# print('Train RMSE: %.4f' % (trainRMSE))
# print('Test RMSE: %.4f' % (testRMSE))
trainMAE = mae(trainY[0], trainPredict[:,0])
testMAE = mae(testY[0], testPredict[:,0])
# print('Train MAE: %.4f' % (trainMAE))
# print('Test MAE: %.4f' % (testMAE))
trainMAPE = wmape(trainY[0], trainPredict[:,0])
testMAPE = wmape(testY[0], testPredict[:,0])
# print('Train MAPE: %.4f' % (trainMAPE))
# print('Test MAPE: %.4f' % (testMAPE))
# print()
return trainPredict, trainY, testPredict, testY
def plot_prediction_static(stockname, train, test, trainPredict, testPredict, look_back=None):
# print("==========================================PLOT==========================================")
actual_data = np.empty_like(dataset)
actual_data[:, :] = np.nan
actual_data[:len(train), :] = min_max_scaler.inverse_transform(train)
actual_data[len(train):len(dataset), :] = min_max_scaler.inverse_transform(test)
plt.plot(actual_data[:len(train)], label='actual train')
plt.plot(range(len(train), len(dataset)), actual_data[len(train):], label='actual test')
trainPredictPlot = np.empty_like(dataset)
trainPredictPlot[:, :] = np.nan
trainPredictPlot[look_back:len(trainPredict)+look_back, :] = trainPredict
testPredictPlot = np.empty_like(dataset)
testPredictPlot[:, :] = np.nan
testPredictPlot[len(trainPredict)+(look_back*2)+1:len(dataset)-1, :] = testPredict
plt.plot(trainPredictPlot, label='predicted train')
plt.plot(testPredictPlot, label='predicted test')
plt.legend()
plt.title(f'LSTM {stockname.upper()}')
plt.show()
def plot_decision_static(stockname, df_plot_train, df_plot_test):
category_colors = {'Low': 'green', 'Medium': 'orange', 'High': 'red'}
category_markers = {'Low': 'o', 'Medium': '^', 'High': 's'}
plt.figure(figsize=(8, 6))
df = pd.concat([df_plot_train,df_plot_test], axis=0)
for category, group in df.groupby('category'):
plt.scatter(group.index, group['yhat1'], label=category, color=category_colors[category],
marker=category_markers[category], s=50)
plt.xlabel('Date')
plt.ylabel('Predicted Adjusted Close Price')
plt.title(f'Predicted Adjusted Close Prices by Category for {stockname.upper()}')
plt.legend()
plt.grid(True)
plt.show()
def calculate_rsi(y_train, y_test, trainPredict, testPredict, train_size, test_size):
train = pd.DataFrame(min_max_scaler.inverse_transform(y_train.reshape(-1,1)), columns=['y'])
test = pd.DataFrame(min_max_scaler.inverse_transform(y_test.reshape(-1,1)), columns=['y'])
train_pred = pd.DataFrame(min_max_scaler.inverse_transform(trainPredict.reshape(-1,1)), columns=['yhat1'])
test_pred = pd.DataFrame(min_max_scaler.inverse_transform(testPredict.reshape(-1,1)), columns=['yhat1'])
train['ds'] = data.iloc[:train_size, data.columns.get_loc('ds')].reset_index(drop=True)
test['ds'] = data.iloc[train_size + 1:, data.columns.get_loc('ds')].reset_index(drop=True)
train_pred['ds'] = data.iloc[:train_size, data.columns.get_loc('ds')].reset_index(drop=True)
test_pred['ds'] = data.iloc[train_size + 1:, data.columns.get_loc('ds')].reset_index(drop=True)
train.set_index('ds', inplace=True)
test.set_index('ds', inplace=True)
train_pred.set_index('ds', inplace=True)
test_pred.set_index('ds', inplace=True)
df_plot_train = pd.concat([train, train_pred], axis=1)
df_plot_test = pd.concat([test, test_pred], axis=1)
# ==================================================TRAIN=======================================================
n = 7
df_plot_train['y_diff'] = df_plot_train['y'].diff()
df_plot_train['avgU'] = df_plot_train['y_diff'].apply(lambda x: x if x > 0 else 0)
df_plot_train['avgD'] = df_plot_train['y_diff'].apply(lambda x: abs(x) if x < 0 else 0)
avgU = df_plot_train['avgU'].rolling(window=n, min_periods=1).mean()
avgD = df_plot_train['avgD'].rolling(window=n, min_periods=1).mean()
rel_strength = avgU/avgD
RSI = 100 - (100 / (1 + rel_strength))
df_plot_train['RSI'] = RSI
df_plot_train['category'] = df_plot_train['RSI'].apply(categorize)
# ==================================================TEST========================================================
df_plot_test['y_diff'] = df_plot_test['y'].diff()
df_plot_test['avgU'] = df_plot_test['y_diff'].apply(lambda x: x if x > 0 else 0)
df_plot_test['avgD'] = df_plot_test['y_diff'].apply(lambda x: abs(x) if x < 0 else 0)
avgU = df_plot_test['avgU'].rolling(window=n, min_periods=1).mean()
avgD = df_plot_test['avgD'].rolling(window=n, min_periods=1).mean()
rel_strength = avgU/avgD
RSI = 100 - (100 / (1 + rel_strength))
df_plot_test['RSI'] = RSI
df_plot_test['category'] = df_plot_test['RSI'].apply(categorize)
return df_plot_train, df_plot_test
def plot_decision_interactive(stockname, df_plot_train, df_plot_test):
matplotlib_to_plotly_symbols = {
'o': 'circle',
'^': 'triangle-up',
's': 'square'
}
category_markers = {'Low': 'o', 'Medium': '^', 'High': 's'}
category_plotly_symbols = {k: matplotlib_to_plotly_symbols[v] for k, v in category_markers.items()}
category_colors = {'Low': 'green', 'Medium': 'orange', 'High': 'red'}
# df = pd.concat([df_plot_train,df_plot_train], axis=0)
df = df_plot_test.copy()
traces = []
for category, group in df.groupby('category'):
trace = go.Scatter(
x=group.index,
y=group['yhat1'],
mode='markers',
name=category,
marker=dict(
color=category_colors[category],
symbol=category_plotly_symbols[category],
size=10,
),
)
traces.append(trace)
secondary_trace = go.Scatter(
x=df.index,
y=df['RSI'],
name='RSI',
yaxis='y2',
)
traces.append(secondary_trace)
layout = go.Layout(
title=f'Predicted Adjusted Close Price from {min(df.index.date)} to {max(df.index.date)} for {stockname.upper()}',
xaxis=dict(title='Date'),
yaxis=dict(title='Predicted Adjusted Close Price'),
yaxis2=dict(
title='RSI',
overlaying='y',
side='right',
position=0.98,
),
showlegend=True,
)
fig = go.Figure(data=traces, layout=layout)
fig.show()
stock_lookback_pairs = {
'amlex': 1,
'do': 1,
'frontkn': 1,
'inari': 1,
'jhm': 1,
'kesm': 1,
'keyasic': 1,
'mpi': 1,
'mqtech': 5,
'turiya': 5,
'unisem': 1,
'vis': 1,
'vitrox': 1,
}
min_max_scaler = preprocessing.MinMaxScaler(feature_range=(0, 1))
for stockname, look_back in stock_lookback_pairs.items():
settings = {
"hyperparameters": {
"external_signals": {
"external_signals_combo": [
[]
]
}
}
}
data = getDataset(stockname=stockname, settings=settings)
train, test, x_train, y_train, x_test, y_test, min_max_scaler, train_size, test_size = split_train_test(data, look_back=1)
trainPredict, trainY, testPredict, testY = train_predict_lstm(x_train, x_test, y_train, y_test, look_back=1, min_max_scaler=min_max_scaler)
# plot_prediction_static(stockname, train, test, trainPredict, testPredict, look_back=1)
df_plot_train, df_plot_test = calculate_rsi(y_train, y_test, trainPredict, testPredict, train_size, test_size)
# plot_decision_static(stockname, df_plot_train, df_plot_test)
plot_decision_interactive(stockname, df_plot_train, df_plot_test)
6/6 [==============================] - 1s 3ms/step 3/3 [==============================] - 0s 4ms/step
6/6 [==============================] - 1s 4ms/step 3/3 [==============================] - 0s 5ms/step
6/6 [==============================] - 1s 3ms/step 3/3 [==============================] - 0s 6ms/step
6/6 [==============================] - 1s 3ms/step 3/3 [==============================] - 0s 4ms/step
6/6 [==============================] - 1s 3ms/step 3/3 [==============================] - 0s 4ms/step
6/6 [==============================] - 1s 3ms/step 3/3 [==============================] - 0s 4ms/step
6/6 [==============================] - 1s 4ms/step 3/3 [==============================] - 0s 4ms/step
6/6 [==============================] - 1s 4ms/step 3/3 [==============================] - 0s 5ms/step
6/6 [==============================] - 1s 3ms/step 3/3 [==============================] - 0s 3ms/step
6/6 [==============================] - 2s 3ms/step 3/3 [==============================] - 0s 4ms/step
6/6 [==============================] - 1s 4ms/step 3/3 [==============================] - 0s 5ms/step
6/6 [==============================] - 2s 3ms/step 3/3 [==============================] - 0s 4ms/step
6/6 [==============================] - 1s 3ms/step 3/3 [==============================] - 0s 4ms/step